/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kylin.rest.service;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.client.CoordinatorClientFactory;
import org.apache.kylin.stream.core.client.HttpReceiverAdminClient;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.model.stats.ClusterState;
import org.apache.kylin.stream.core.model.stats.ConsumerStats;
import org.apache.kylin.stream.core.model.stats.CubeRealTimeState;
import org.apache.kylin.stream.core.model.stats.PartitionConsumeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeRealTimeState;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverState;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.model.stats.ReplicaSetState;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;

import org.apache.kylin.shaded.com.google.common.cache.Cache;
import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;

/**
 * StreamingCoordinatorService will try to forward request to coordinator leader.
 */
@Component("streamingServiceV2")
public class StreamingV2Service extends BasicService {
    private static final Logger logger = LoggerFactory.getLogger(StreamingV2Service.class);
    private static final String CLUSTER_STATE = "cluster_state";

    private StreamMetadataStore streamMetadataStore;

    private ReceiverAdminClient receiverAdminClient;

    private Cache<String, ClusterState> clusterStateCache = CacheBuilder.newBuilder()
            .expireAfterWrite(10, TimeUnit.SECONDS).build();

    private ExecutorService clusterStateExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(), new NamedThreadFactory("fetch_receiver_state"));

    public StreamingV2Service() {
        streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
        receiverAdminClient = new HttpReceiverAdminClient();
    }

    StreamingV2Service(StreamMetadataStore metadataStore, ReceiverAdminClient adminClient) {
        streamMetadataStore = metadataStore;
        receiverAdminClient = adminClient;
    }

    public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException {
        List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
        if (StringUtils.isEmpty(table)) {
            streamingSourceConfigs = getStreamingManagerV2().listAllStreaming();
        } else {
            StreamingSourceConfig config = getStreamingManagerV2().getConfig(table);
            if (config != null) {
                streamingSourceConfigs.add(config);
            }
        }

        return streamingSourceConfigs;
    }

    public List<StreamingSourceConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset)
            throws IOException {
        List<StreamingSourceConfig> streamingSourceConfigs;
        streamingSourceConfigs = listAllStreamingConfigs(table);

        if (limit == null || offset == null) {
            return streamingSourceConfigs;
        }

        if ((streamingSourceConfigs.size() - offset) < limit) {
            return streamingSourceConfigs.subList(offset, streamingSourceConfigs.size());
        }

        return streamingSourceConfigs.subList(offset, offset + limit);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
            + " or hasPermission(#project, 'ADMINISTRATION')")
    public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig config, ProjectInstance project) throws IOException {
        if (getStreamingManagerV2().getConfig(config.getName()) != null) {
            throw new InternalErrorException("The streamingSourceConfig named " + config.getName() + " already exists");
        }
        StreamingSourceConfig streamingSourceConfig = getStreamingManagerV2().saveStreamingConfig(config);
        return streamingSourceConfig;
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig config) throws IOException {
        return getStreamingManagerV2().updateStreamingConfig(config);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void dropStreamingConfig(StreamingSourceConfig config) throws IOException {
        getStreamingManagerV2().removeStreamingConfig(config);
    }

    public String getParserTemplate(final int sourceType, StreamingSourceConfig config) {
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(new ISourceAware() {
            @Override
            public int getSourceType() {
                return sourceType;
            }

            @Override
            public KylinConfig getConfig() {
                return getConfig();
            }
        });
        return streamingSource.getMessageTemplate(config);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
            + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public List<CubeAssignment> getStreamingCubeAssignments(final CubeInstance cube) {
        if (cube == null) {
            return streamMetadataStore.getAllCubeAssignments();
        }
        List<CubeAssignment> result = Lists.newArrayList();
        CubeAssignment assignment = streamMetadataStore.getAssignmentsByCube(cube.getName());
        if (assignment != null) {
            result.add(assignment);
        }
        return result;
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public Map<Integer, Map<String, List<Partition>>> getStreamingReplicaSetAssignments(Integer replicaSetID) {
        if (replicaSetID == null) {
            return streamMetadataStore.getAllReplicaSetAssignments();
        }
        Map<Integer, Map<String, List<Partition>>> result = Maps.newHashMap();
        Map<String, List<Partition>> assignment = streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID);
        if (assignment != null) {
            result.put(replicaSetID, assignment);
        }
        return result;
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public Map<Integer, Map<String, List<Partition>>> reBalancePlan() {
        return getCoordinatorClient().reBalanceRecommend();
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan) {
        getCoordinatorClient().reBalance(reBalancePlan);
    }

    public List<String> getStreamingCubes() {
        return streamMetadataStore.getCubes();
    }

    public StreamingCubeConsumeState getStreamingCubeConsumeState(String cubeName) {
        return streamMetadataStore.getStreamingCubeConsumeState(cubeName);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
            + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void assignCube(CubeInstance cube) {
        getCoordinatorClient().assignCube(cube.getName());
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
            + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void unAssignCube(CubeInstance cube) {
        getCoordinatorClient().unAssignCube(cube.getName());
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void reAssignCube(String cubeName, CubeAssignment newAssignment) {
        validateAssignment(newAssignment);
        getCoordinatorClient().reAssignCube(cubeName, newAssignment);
    }

    void validateAssignment(CubeAssignment newAssignment) {
        Map<Integer, List<Partition>> assignments = newAssignment.getAssignments();
        Map<Integer, Set<Partition>> assignmentSet = assignments.keySet().stream().collect(
                Collectors.toMap(Function.identity(), x -> new HashSet<>(assignments.get(x))));

        Set<Integer> inputReplicaSetIDs = assignments.keySet();
        Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs());
        for (Integer inputReplicaSetID : inputReplicaSetIDs) {
            if (!allReplicaSetIDs.contains(inputReplicaSetID)) {
                throw new IllegalArgumentException("The replica set id:" + inputReplicaSetID + " does not exist");
            }

            Set<Partition> partitionSet = assignmentSet.get(inputReplicaSetID);
            if (partitionSet.isEmpty()) {
                throw new IllegalArgumentException("PartitionList is empty :" + inputReplicaSetID);
            }
            for (Map.Entry<Integer, Set<Partition>> entry : assignmentSet.entrySet()) {
                if (!entry.getKey().equals(inputReplicaSetID)) {
                    Set<Partition> anotherPartitionSet = entry.getValue();
                    int intersection = Sets.intersection(anotherPartitionSet, partitionSet).size();
                    if (intersection > 0) {
                        throw new IllegalArgumentException("Intersection detected between : " + inputReplicaSetID + " with " + entry.getKey());
                    }
                }
            }
        }
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
            + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void pauseConsumers(CubeInstance cube) {
        getCoordinatorClient().pauseConsumers(cube.getName());
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
            + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void resumeConsumers(CubeInstance cube) {
        getCoordinatorClient().resumeConsumers(cube.getName());
    }

    public StreamingSourceConfigManager getStreamingManagerV2() {
        return StreamingSourceConfigManager.getInstance(getConfig());
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeCubeAssignment() {

    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public List<Node> getReceivers() {
        List<Node> result = streamMetadataStore.getReceivers();
        return result;
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeReceiver(Node receiver) {
        List<ReplicaSet> replicaSets = streamMetadataStore.getReplicaSets();
        for (ReplicaSet replicaSet : replicaSets) {
            Set<Node> receivers = replicaSet.getNodes();
            if (receivers != null && receivers.contains(receiver)) {
                throw new IllegalStateException("Before remove receiver, it must be firstly removed from replica set:"
                        + replicaSet.getReplicaSetID());
            }
        }
        streamMetadataStore.removeReceiver(receiver);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void createReplicaSet(ReplicaSet rs) {
        getCoordinatorClient().createReplicaSet(rs);
        clusterStateCache.invalidate(CLUSTER_STATE);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeReplicaSet(int rsID) {
        getCoordinatorClient().removeReplicaSet(rsID);
        clusterStateCache.invalidate(CLUSTER_STATE);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) {
        getCoordinatorClient().addNodeToReplicaSet(replicaSetID, nodeID);
        clusterStateCache.invalidate(CLUSTER_STATE);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) {
        getCoordinatorClient().removeNodeFromReplicaSet(replicaSetID, nodeID);
        clusterStateCache.invalidate(CLUSTER_STATE);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public List<ReplicaSet> getReplicaSets() {
        List<ReplicaSet> result = streamMetadataStore.getReplicaSets();
        return result;
    }

    public ReceiverStats getReceiverStats(Node receiver) {
        try {
            return receiverAdminClient.getReceiverStats(receiver);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public ReceiverCubeStats getReceiverCubeStats(Node receiver, String cubeName) {
        try {
            return receiverAdminClient.getReceiverCubeStats(receiver, cubeName);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
            + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT') or hasPermission(#cube.getProjectInstance(), 'MANAGEMENT') or hasPermission(#cube.getProjectInstance(), 'ADMINISTRATION')")
    public CubeRealTimeState getCubeRealTimeState(CubeInstance cube) {
        CubeRealTimeState result = new CubeRealTimeState();
        result.setCubeName(cube.getName());
        CubeAssignment cubeAssignment = streamMetadataStore.getAssignmentsByCube(cube.getName());
        Map<Integer, Map<Node, ReceiverCubeRealTimeState>> rsReceiverCubeStateMap = Maps.newHashMap();
        for (Integer replicaSetID : cubeAssignment.getReplicaSetIDs()) {
            ReplicaSet replicaSet = streamMetadataStore.getReplicaSet(replicaSetID);
            Map<Node, ReceiverCubeRealTimeState> receiverCubeStateMap = Maps.newHashMap();
            Set<Node> receivers = replicaSet.getNodes();
            for (Node receiver : receivers) {
                ReceiverCubeRealTimeState receiverCubeRealTimeState = new ReceiverCubeRealTimeState();
                try {
                    ReceiverCubeStats receiverCubeStats = receiverAdminClient.getReceiverCubeStats(receiver,
                            cube.getName());
                    receiverCubeRealTimeState.setState(ReceiverState.State.HEALTHY);
                    receiverCubeRealTimeState.setReceiverCubeStats(receiverCubeStats);
                } catch (IOException e) {
                    logger.error("exception when get receiver cube stats", e);
                    if (!isReceiverReachable(receiver)) {
                        receiverCubeRealTimeState.setState(ReceiverState.State.UNREACHABLE);
                    } else {
                        receiverCubeRealTimeState.setState(ReceiverState.State.DOWN);
                    }
                }
                receiverCubeStateMap.put(receiver, receiverCubeRealTimeState);
            }
            rsReceiverCubeStateMap.put(replicaSetID, receiverCubeStateMap);
        }
        result.setReceiverCubeStateMap(rsReceiverCubeStateMap);

        return result;
    }

    /**
     * Fetch and calculate total cluster state.
     * @return
     */
    public ClusterState getClusterState() {
        ClusterState clusterState = clusterStateCache.getIfPresent(CLUSTER_STATE);
        if (clusterState != null) {
            return clusterState;
        }
        List<ReplicaSet> replicaSets = streamMetadataStore.getReplicaSets();
        List<Node> allReceivers = streamMetadataStore.getReceivers();
        Map<Integer, Map<String, List<Partition>>> rsAssignments = streamMetadataStore.getAllReplicaSetAssignments();

        Map<Node, Future<ReceiverStats>> statsFuturesMap = Maps.newHashMap();
        for (final Node receiver : allReceivers) {
            Future<ReceiverStats> receiverStatsFuture = clusterStateExecutor.submit(() -> receiverAdminClient.getReceiverStats(receiver));
            statsFuturesMap.put(receiver, receiverStatsFuture);
        }

        clusterState = new ClusterState();
        for (ReplicaSet replicaSet : replicaSets) {
            ReplicaSetState replicaSetState = calReplicaSetState(replicaSet,
                    rsAssignments.get(replicaSet.getReplicaSetID()), statsFuturesMap);
            clusterState.addReplicaSetState(replicaSetState);
            allReceivers.removeAll(replicaSet.getNodes());
        }

        // left receivers are not assigned receivers
        for (Node receiver : allReceivers) {
            Future<ReceiverStats> futureStats = statsFuturesMap.get(receiver);
            ReceiverState receiverState = getReceiverStateFromStats(receiver, futureStats);
            clusterState.addAvailableReveiverState(receiverState);
        }
        clusterState.setLastUpdateTime(System.currentTimeMillis());
        clusterStateCache.put("cluster_state", clusterState);
        return clusterState;
    }

    private ReplicaSetState calReplicaSetState(ReplicaSet replicaSet, Map<String, List<Partition>> rsAssignment,
            Map<Node, Future<ReceiverStats>> statsFuturesMap) {
        ReplicaSetState replicaSetState = new ReplicaSetState();
        replicaSetState.setRsID(replicaSet.getReplicaSetID());
        replicaSetState.setAssignment(rsAssignment);
        Set<Node> receivers = replicaSet.getNodes();
        if (receivers == null || receivers.isEmpty()) {
            return replicaSetState;
        }

        Node leadReceiver = replicaSet.getLeader();
        replicaSetState.setLead(leadReceiver);

        Map<Node, ReceiverStats> receiverStatsMap = Maps.newHashMap();
        for (Node receiver : receivers) {
            Future<ReceiverStats> futureStats = statsFuturesMap.get(receiver);
            try {
                ReceiverStats receiverStats = futureStats.get();
                receiverStatsMap.put(receiver, receiverStats);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                replicaSetState.addReveiverState(getReceiverStateFromException(receiver, e));
                continue;
            }
        }

        Map<String, Long> cubeLatestEventMap = Maps.newHashMap();
        for (ReceiverStats receiverStats : receiverStatsMap.values()) {
            Map<String, ReceiverCubeStats> cubeStatsMap = receiverStats.getCubeStatsMap();
            for (Map.Entry<String, ReceiverCubeStats> cubeStatsEntry : cubeStatsMap.entrySet()) {
                String cubeName = cubeStatsEntry.getKey();
                ReceiverCubeStats cubeStats = cubeStatsEntry.getValue();
                Long latestEventTime = cubeLatestEventMap.get(cubeName);
                if (latestEventTime == null || latestEventTime < cubeStats.getLatestEventTime()) {
                    cubeLatestEventMap.put(cubeName, cubeStats.getLatestEventTime());
                }
            }
        }
        long consumeEventLagThreshold = 5 * 60 * 1000L; // default lag warning threshold is 5 minutes
        for (Map.Entry<Node, ReceiverStats> receiverStatsEntry : receiverStatsMap.entrySet()) {
            Node receiver = receiverStatsEntry.getKey();
            ReceiverStats receiverStats = receiverStatsEntry.getValue();
            ReceiverState receiverState = new ReceiverState();
            receiverState.setReceiver(receiver);
            receiverState.setState(ReceiverState.State.HEALTHY);
            Map<String, List<Partition>> receiverAssignment = receiverStats.getAssignments();
            if (!assignmentEqual(receiverAssignment, rsAssignment)) {
                ReceiverState.State state = ReceiverState.State.WARN;
                receiverState.setState(state);
                receiverState.addInfo("assignment is inconsistent");
            }

            if (receiverStats.isLead() && !receiver.equals(leadReceiver)) {
                ReceiverState.State state = ReceiverState.State.WARN;
                receiverState.setState(state);
                receiverState.addInfo("lead state is inconsistent");
            }

            Map<String, ReceiverCubeStats> cubeStatsMap = receiverStats.getCubeStatsMap();
            for (Map.Entry<String, ReceiverCubeStats> cubeStatsEntry : cubeStatsMap.entrySet()) {
                String cubeName = cubeStatsEntry.getKey();
                ReceiverCubeStats cubeStats = cubeStatsEntry.getValue();
                Long latestEventTime = cubeLatestEventMap.get(cubeName);
                if ((latestEventTime - cubeStats.getLatestEventTime()) >= consumeEventLagThreshold) {
                    ReceiverState.State state = ReceiverState.State.WARN;
                    receiverState.setState(state);
                    receiverState.addInfo("cube:" + cubeName + " consuming is lagged");
                }
            }
            receiverState.setRateInOneMin(calConsumeRate(receiver, receiverStats));
            replicaSetState.addReveiverState(receiverState);
        }
        return replicaSetState;
    }

    private boolean assignmentEqual(Map<String, List<Partition>> receiverAssignment,
            Map<String, List<Partition>> rsAssignment) {
        if (emptyMap(receiverAssignment) && emptyMap(rsAssignment)) {
            return true;
        }

        if (receiverAssignment != null) {
            for (Map.Entry<String, List<Partition>> entry : receiverAssignment.entrySet()) {
                Collections.sort(entry.getValue());
                entry.setValue(entry.getValue());
            }
        }

        if (rsAssignment != null) {
            for (Map.Entry<String, List<Partition>> entry : rsAssignment.entrySet()) {
                Collections.sort(entry.getValue());
                entry.setValue(entry.getValue());
            }
        }

        if (receiverAssignment != null && receiverAssignment.equals(rsAssignment)) {
            return true;
        }
        return false;
    }

    private boolean emptyMap(Map map) {
        if (map == null || map.isEmpty()) {
            return true;
        }
        return false;
    }

    private ReceiverState getReceiverStateFromException(Node receiver, ExecutionException e) {
        ReceiverState receiverState = new ReceiverState();
        receiverState.setReceiver(receiver);
        if (!isReceiverReachable(receiver)) {
            receiverState.setState(ReceiverState.State.UNREACHABLE);
        } else {
            receiverState.setState(ReceiverState.State.DOWN);
        }
        return receiverState;
    }

    private boolean isReceiverReachable(Node receiver) {
        try {
            InetAddress address = InetAddress.getByName(receiver.getHost());//ping this IP
            boolean reachable = address.isReachable(1000);
            if (!reachable) {
                return false;
            }
            return true;
        } catch (Exception exception) {
            logger.error("exception when try ping host:" + receiver.getHost(), exception);
            return false;
        }
    }

    private ReceiverState getReceiverStateFromStats(Node receiver, Future<ReceiverStats> futureStats) {
        ReceiverState receiverState = new ReceiverState();
        try {
            ReceiverStats receiverStats = futureStats.get();
            receiverState.setReceiver(receiver);
            receiverState.setState(ReceiverState.State.HEALTHY);
            receiverState.setRateInOneMin(calConsumeRate(receiver, receiverStats));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            receiverState = getReceiverStateFromException(receiver, e);
        }
        return receiverState;
    }

    private double calConsumeRate(Node receiver, ReceiverStats receiverStats) {
        double result = 0;
        Map<String, ReceiverCubeStats> cubeStatsMap = receiverStats.getCubeStatsMap();
        for (Map.Entry<String, ReceiverCubeStats> receiverCubeStatsEntry : cubeStatsMap.entrySet()) {
            ReceiverCubeStats cubeStats = receiverCubeStatsEntry.getValue();
            ConsumerStats consumerStats = cubeStats.getConsumerStats();
            if (consumerStats == null) {
                logger.warn("no consumer stats exist for cube:{} in receiver:{}", receiverCubeStatsEntry.getKey(),
                        receiver);
                continue;
            }
            Map<Integer, PartitionConsumeStats> partitionConsumeStatsMap = consumerStats.getPartitionConsumeStatsMap();
            for (PartitionConsumeStats partitionStats : partitionConsumeStatsMap.values()) {
                result += partitionStats.getOneMinRate();
            }
        }
        return result;
    }

    //    private Node.NodeStatus getReceiverStatus(Node receiver) {
    //        try {
    //            HealthCheckInfo healthCheckInfo = receiverAdminClient.healthCheck(receiver);
    //            if (healthCheckInfo.getStatus() == HealthCheckInfo.Status.GOOD) {
    //                return Node.NodeStatus.HEALTHY;
    //            } else {
    //                return Node.NodeStatus.STOPPED;
    //            }
    //        } catch (IOException e) {
    //            return Node.NodeStatus.STOPPED;
    //        }
    //    }

    private synchronized CoordinatorClient getCoordinatorClient() {
        return CoordinatorClientFactory.createCoordinatorClient(streamMetadataStore);
    }
}
